<!DOCTYPE html>


<html lang="zh-CN">
  

    <head>
      <meta charset="utf-8" />
        
      <meta name="description" content="一个简单的博客" />
      
      <meta
        name="viewport"
        content="width=device-width, initial-scale=1, maximum-scale=1"
      />
      <title>spark |  空唤晴</title>
  <meta name="generator" content="hexo-theme-ayer">
      
      <link rel="shortcut icon" href="/favicon.ico" />
       
<link rel="stylesheet" href="/dist/main.css">

      
<link rel="stylesheet" href="/css/fonts/remixicon.css">

      
<link rel="stylesheet" href="/css/custom.css">
 
      <script src="https://cdn.staticfile.org/pace/1.2.4/pace.min.js"></script>
       
 

      <link
        rel="stylesheet"
        href="https://cdn.jsdelivr.net/npm/@sweetalert2/theme-bulma@5.0.1/bulma.min.css"
      />
      <script src="https://cdn.jsdelivr.net/npm/sweetalert2@11.0.19/dist/sweetalert2.min.js"></script>

      <!-- mermaid -->
      
      <style>
        .swal2-styled.swal2-confirm {
          font-size: 1.6rem;
        }
      </style>
    <link rel="alternate" href="/atom.xml" title="空唤晴" type="application/atom+xml">
</head>
  </html>
</html>


<body>
  <div id="app">
    
      <canvas class="fireworks"></canvas>
      <style>
        .fireworks {
          position: fixed;
          left: 0;
          top: 0;
          z-index: 99999;
          pointer-events: none;
        }
      </style>
      
      
    <main class="content on">
      <section class="outer">
  <article
  id="post-其它/分布式/spark"
  class="article article-type-post"
  itemscope
  itemprop="blogPost"
  data-scroll-reveal
>
  <div class="article-inner">
    
    <header class="article-header">
       
<h1 class="article-title sea-center" style="border-left:0" itemprop="name">
  spark
</h1>
 

      
    </header>
     
    <div class="article-meta">
      <a href="/2023/05/14/%E5%85%B6%E5%AE%83/%E5%88%86%E5%B8%83%E5%BC%8F/spark/" class="article-date">
  <time datetime="2023-05-14T03:01:41.000Z" itemprop="datePublished">2023-05-14</time>
</a> 
  <div class="article-category">
    <a class="article-category-link" href="/categories/%E5%85%B6%E5%AE%83/">其它</a> / <a class="article-category-link" href="/categories/%E5%85%B6%E5%AE%83/%E5%88%86%E5%B8%83%E5%BC%8F/">分布式</a>
  </div>
  
<div class="word_count">
    <span class="post-time">
        <span class="post-meta-item-icon">
            <i class="ri-quill-pen-line"></i>
            <span class="post-meta-item-text"> 字数统计:</span>
            <span class="post-count">3.9k</span>
        </span>
    </span>

    <span class="post-time">
        &nbsp; | &nbsp;
        <span class="post-meta-item-icon">
            <i class="ri-book-open-line"></i>
            <span class="post-meta-item-text"> 阅读时长≈</span>
            <span class="post-count">15 分钟</span>
        </span>
    </span>
</div>
 
    </div>
      
    <div class="tocbot"></div>




  
    <div class="article-entry" itemprop="articleBody">
       
  <h1 id="流处理器"><a href="#流处理器" class="headerlink" title="流处理器"></a>流处理器</h1><hr>
<h2 id="Spark-Streaming"><a href="#Spark-Streaming" class="headerlink" title="Spark Streaming"></a>Spark Streaming</h2><p>Spark 是一种快速、通用、可扩展的大数据分析引擎，已经发展成为一个包含多个子项目的集合。 Spark Streaming 是 Spark 的流处理部分。</p>
<p>Spark 的流处理是基于所谓微批处理的思想，把流处理看作是批处理的一种特殊形式，每次接收到一个时间间隔的数据才会去处理，所以天生很难在实时性上有所提升。</p>
<p>虽然在 Spark2.3 中提出了连续处理模型( Continuous Processing Model),但是现在只支持很有限的功能,并不能在大的项目中使用。 Spark还需要做出很大的努力才能改进现有的流处理模型想要在流处理的实时性上提升,就不能継续用微批处理的模式,而要想办法实现真正的流处理即每当有一条数据输入就立刻处理,不做等待。</p>
<h3 id="数据类型"><a href="#数据类型" class="headerlink" title="数据类型"></a>数据类型</h3><p>在内部，每个数据块就是一个 RDD，所以 spark streaming 有 RDD 所有优点，处理速度快，容错性好，支持高度并行计算。</p>
<h3 id="操作流程"><a href="#操作流程" class="headerlink" title="操作流程"></a>操作流程</h3><p>第一，我们将Spark Streaming类名和StreamingContext的一些隐式转换导入到我们的环境中，以便将有用的方法添加到我们需要的其他类（如DStream）中。StreamingContext是所有流功能的主要入口点。我们创建一个带有两个执行线程的本地StreamingContext，批处理间隔为1秒。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">public</span> <span class="keyword">static</span> <span class="keyword">void</span> <span class="title function_">main</span><span class="params">(String[] args)</span> <span class="keyword">throws</span> InterruptedException &#123;</span><br><span class="line">    <span class="comment">// 工作环境</span></span><br><span class="line">    <span class="type">SparkConf</span> <span class="variable">conf</span> <span class="operator">=</span> <span class="keyword">new</span> <span class="title class_">SparkConf</span>().setMaster(<span class="string">&quot;local[2]&quot;</span>).setAppName(<span class="string">&quot;NetworkWordCount&quot;</span>);     <span class="comment">// 定义双线程 / APP 名称</span></span><br><span class="line">    <span class="type">JavaStreamingContext</span> <span class="variable">jssc</span> <span class="operator">=</span> <span class="keyword">new</span> <span class="title class_">JavaStreamingContext</span>(conf, Durations.seconds(<span class="number">1</span>));          <span class="comment">// 定义批处理时间间隔 1s</span></span><br><span class="line">    <span class="comment">// 流创建（从源导入）</span></span><br><span class="line">    JavaReceiverInputDStream&lt;String&gt; lines = jssc.socketTextStream(<span class="string">&quot;localhost&quot;</span>, <span class="number">9999</span>);</span><br><span class="line">    <span class="comment">// 流处理（数据分离、统计并打印）</span></span><br><span class="line">    JavaDStream&lt;String&gt; words = lines.flatMap(x -&gt; Arrays.asList(x.split(<span class="string">&quot; &quot;</span>)).iterator());    </span><br><span class="line">    JavaPairDStream&lt;String, Integer&gt; pairs = words.mapToPair(s -&gt; <span class="keyword">new</span> <span class="title class_">Tuple2</span>&lt;&gt;(s, <span class="number">1</span>));</span><br><span class="line">    JavaPairDStream&lt;String, Integer&gt; wordCounts = pairs.reduceByKey((i1, i2) -&gt; i1 + i2);</span><br><span class="line">    wordCounts.print();</span><br><span class="line">    <span class="comment">// 启动流运算</span></span><br><span class="line">    jssc.start();</span><br><span class="line">    jssc.awaitTermination();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>


<h4 id="DStream-对象"><a href="#DStream-对象" class="headerlink" title="DStream 对象"></a>DStream 对象</h4><p>Spark Streaming 提供一个对于流数据的抽象 DStream。DStream 可以由来自 Apache Kafka、Flume 或者 HDFS 中的流数据生成，也可以由别的 DStream 经过各种转换操作得来。</p>
<p>底层 DStream 也是由很多个序列化的 RDD 构成，按时间片（比如一秒）切分成的每个数据单位都是一个 RDD。然后，Spark 核心引擎将对 DStream 的 Transformation 操作变为针对 Spark 中对 RDD 的 Transformation 操作，将 RDD 经过操作变成中间结果保存在内存中。</p>
<p>由于 Spark Streaming 将底层的细节封装起来了，所以对于开发者来说，只需要操作 DStream 就行。接下来，让我们一起学习 DStream 的结构以及它支持的转换操作。</p>
<h4 id="StreamingContext-对象"><a href="#StreamingContext-对象" class="headerlink" title="StreamingContext 对象"></a>StreamingContext 对象</h4><p>任何 Spark Streaming 的程序都要首先创建一个 StreamingContext 的对象，它是所有 Streaming 操作的入口。StreamingContext 中最重要的参数是批处理的时间间隔，即把流数据细分成数据块的粒度。</p>
<p>用 <code>streamingContext.start()</code> 来开始接收数据并处理它<br>用 <code>streamingContext.awaitTermination()</code> 等待处理停止（手动停止或由于任何错误）<br>用 <code>streamingContext.stop()</code> 可以手动停止</p>
<p>一旦启动上下文，就无法设置新的流计算或将其添加到该流计算中<br>上下文一旦停止，就无法重新启动<br>一个JVM中只能同时激活一个StreamingContext<br>StreamingContext中的stop()也会停止SparkContext。但如果要仅停止StreamingContext的话，设置stop(false)<br>只要在创建下一个StreamingContext之前停止了上一个StreamingContext（不停止SparkContext），就可以将SparkContext重用于创建多个StreamingContext</p>
<h2 id="Spark-操作-kafka"><a href="#Spark-操作-kafka" class="headerlink" title="Spark 操作 kafka"></a>Spark 操作 kafka</h2><p>Spark Streaming提供了两类内置的streaming源：</p>
<p>Basic sources ：直接在StreamingContext API中可用的源。例如，文件系统和socket连接<br>Advanced sources ：像Kafka，Flume，Kinesis等这样的源，可通过额外的程序类获得 </p>
<h3 id="消费"><a href="#消费" class="headerlink" title="消费"></a>消费</h3><ol>
<li>先把数据接收过来，转换为spark streaming中的数据结构Dstream。接收数据的方式有两种：1.利用Receiver接收数据，2.直接从kafka读取数据。</li>
</ol>
<p>在spark1.3之后，引入了Direct方式。不同于Receiver的方式，Direct方式没有receiver这一层，其会周期性的获取Kafka中每个topic的每个partition中的最新offsets，之后根据设定的maxRatePerPartition来处理每个batch。</p>
<p>0.10以后只保留了direct模式(Reveiver模式不适合生产环境)，并且0.10版本API有变化（更加强大）</p>
<h3 id="生产"><a href="#生产" class="headerlink" title="生产"></a>生产</h3><p>与读数据不同，Spark并没有提供统一的接口用于写入Kafka，所以我们需要使用底层Kafka接口进行包装。<br>最直接的做法我们可以想到如下这种方式：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br></pre></td><td class="code"><pre><span class="line">input.foreachRDD(rdd =&gt;</span><br><span class="line">  <span class="comment">// 不能在这里创建KafkaProducer</span></span><br><span class="line">  rdd.foreachPartition(partition =&gt;</span><br><span class="line">    partition.foreach&#123;</span><br><span class="line">      <span class="keyword">case</span> x:String=&gt;&#123;</span><br><span class="line">        <span class="type">val</span> <span class="variable">props</span> <span class="operator">=</span> <span class="keyword">new</span> <span class="title class_">HashMap</span>[String, Object]()</span><br><span class="line">        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)</span><br><span class="line">        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,</span><br><span class="line">          <span class="string">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;</span>)</span><br><span class="line">        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,</span><br><span class="line">          <span class="string">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;</span>)</span><br><span class="line">        println(x)</span><br><span class="line">        <span class="type">val</span> <span class="variable">producer</span> <span class="operator">=</span> <span class="keyword">new</span> <span class="title class_">KafkaProducer</span>[String,String](props)</span><br><span class="line">        val message=<span class="keyword">new</span> <span class="title class_">ProducerRecord</span>[String, String](<span class="string">&quot;output&quot;</span>,<span class="literal">null</span>,x)</span><br><span class="line">        producer.send(message)</span><br><span class="line">      &#125;</span><br><span class="line">    &#125;</span><br><span class="line">  )</span><br><span class="line">)</span><br></pre></td></tr></table></figure>

<p>但是这种方式缺点很明显，对于每个partition的每条记录，我们都需要创建KafkaProducer，然后利用producer进行输出操作，注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边，因为KafkaProducer是不可序列化的（not serializable）。显然这种做法是不灵活且低效的，因为每条记录都需要建立一次连接。如何解决呢？</p>
<p>首先，我们需要将KafkaProducer利用lazy val的方式进行包装如下：</p>
<hr>
<h2 id="Flink"><a href="#Flink" class="headerlink" title="Flink"></a>Flink</h2><p>目前唯一同时支持高吞吐、低延迟、高性能的分布式流式数据处理框架。像Apache Spark也只能兼顾高吞吐和高性能特性，主要因为在Spark Streaming流式计算中无法做到低延迟保障。</p>
<p><strong>优势</strong></p>
<ol>
<li><p>支持事件事件概念。</p>
</li>
<li><p>支持有状态计算，保持了事件原本产生的时序性，避免网络传输带来的影响。</p>
</li>
<li><p>支持高度灵活的窗口操作，Flink将窗口分为Time、Count、Session以及Data-driven等类型的窗口操作，可以灵活的处罚条件定制化来达到对复杂的流传输模式的支持。</p>
</li>
<li><p>基于轻量级分布式快照实现容错，将大型计算任务的流程拆解成小的计算过程，分布到并行节点上处理。并通过 Checkpoints 将执行过程中的状态信息进行持久化存储，可以自动恢复出现异常的任务。</p>
</li>
<li><p>基于 JVM 实现独立的内存管理。</p>
</li>
</ol>
<p><strong>运行环境</strong></p>
<ul>
<li>JDK 版本必须在 1.8 及以上</li>
<li>Maven 版本必须在 3.0.4 及以上</li>
<li>Hadoop 环境支持 hadoop 2.4、2.6、2.7、2.8 等主要版本</li>
</ul>
<p>Flink 支持使用 Java/Scala 开发，以下示例代码全部使用 Java .</p>
<h3 id="基本组件"><a href="#基本组件" class="headerlink" title="基本组件"></a>基本组件</h3><ol>
<li><p>Flink 架构体系基本上分三层（自顶向下）：</p>
<ul>
<li><p><strong>API &amp; Libraries 层</strong>： 提供支撑流计算和批计算的接口，，同时在此基础上抽象出不同的应用类型的组件库。</p>
</li>
<li><p><strong>Runtime 核心层</strong>：Flink分布式计算框架的核心实现层，负责分布式作业的执行、映射转换、任务调度等。将 DataStream 和 DataSet 转成同意的可执行的 Task Operator 。</p>
</li>
<li><p><strong>物理部署层</strong>：目前Flink支持本地、集群、云、容器部署，Flink通过盖层能够支持不同平台的部署，用户可以根据需要选择使用对应的部署模式。</p>
</li>
</ul>
</li>
<li><p>Flink 基本架构</p>
<ul>
<li><p><strong>Client 客户端</strong>：负责将任务提交到集群，与JobManager构建Akka连接，然后将任务提交到JobManager，通过和JobManager之间进行交互获取任务执行状态。</p>
</li>
<li><p><strong>JobManager</strong>：负责整个Flink集群任务的调度以及资源的管理</p>
</li>
<li><p><strong>TaskManager</strong>：相当于整个集群的Slave节点，负责具体的任务执行和对应任务在每个节点上的资源申请与管理。</p>
</li>
</ul>
</li>
</ol>
<h3 id="编程模型"><a href="#编程模型" class="headerlink" title="编程模型"></a>编程模型</h3><ol>
<li> BasicTypeInfo 数据类型：支持任意 Java 原生基本类型或 String 类型。</li>
</ol>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br></pre></td><td class="code"><pre><span class="line"><span class="comment">// 直接获取</span></span><br><span class="line">DataSource&lt;String&gt; inputStream= environment.fromElements(<span class="string">&quot;1&quot;</span>, <span class="string">&quot;2&quot;</span>, <span class="string">&quot;3&quot;</span>, <span class="string">&quot;4&quot;</span>, <span class="string">&quot;5&quot;</span>, <span class="string">&quot;6&quot;</span>);</span><br><span class="line"></span><br><span class="line"><span class="comment">// 从集合获取</span></span><br><span class="line">ArrayList&lt;String&gt; list = <span class="keyword">new</span> <span class="title class_">ArrayList</span>&lt;&gt;(list2);</span><br><span class="line">DataSource&lt;String&gt; inputStream= environment.fromCollection(list);</span><br></pre></td></tr></table></figure>

<ol start="2">
<li>TupleTypeInfo 数据类型：标识 Tuple 类型数据。</li>
</ol>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">DataSource&lt;Tuple2&gt; inputStreamTuple = environment.fromElements(<span class="keyword">new</span> <span class="title class_">Tuple2</span>(<span class="string">&quot;fangpc&quot;</span>, <span class="number">1</span>), <span class="keyword">new</span> <span class="title class_">Tuple2</span>(<span class="string">&quot;fangpengcheng&quot;</span>, <span class="number">2</span>));</span><br></pre></td></tr></table></figure>

<ol start="3">
<li>PojoTypeInfo 数据类型：描述任意的 POJOs ，字段类型必须是上述基础类型，拥有默认构造方法和 getter/setter 方法</li>
</ol>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line"><span class="type">var</span> <span class="variable">personStream</span> <span class="operator">=</span> environment.fromElements(<span class="keyword">new</span> <span class="title class_">Person</span>(<span class="string">&quot;fangpc&quot;</span>, <span class="number">24</span>), <span class="keyword">new</span> <span class="title class_">Person</span>(<span class="string">&quot;fangpengcheng&quot;</span>, <span class="number">25</span>));</span><br></pre></td></tr></table></figure>


<ol start="4">
<li><p>Value 数据类型：实现了org.apache.flink.types.Value，其中包括 read() 和 write() 两个方法完成序列化和反序列化操作，有着比较高效的性能。Flink 提供的内建 Value 类型有 IntValue、DoubleValue、StringValue 等。</p>
</li>
<li><p>特殊数据类型：</p>
<ul>
<li>Scala中的List、Map、Either、Option、Try数据类型</li>
<li>Java中Either</li>
<li>Hadoop的Writable数据类型</li>
</ul>
</li>
</ol>
<h3 id="操作流程-1"><a href="#操作流程-1" class="headerlink" title="操作流程"></a>操作流程</h3><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">public</span> <span class="keyword">class</span> <span class="title class_">StreamingJob</span> &#123;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">public</span> <span class="keyword">static</span> <span class="keyword">void</span> <span class="title function_">main</span><span class="params">(String[] args)</span> <span class="keyword">throws</span> Exception &#123;</span><br><span class="line">        <span class="comment">// 创建执行环境</span></span><br><span class="line">        <span class="keyword">final</span> <span class="type">StreamExecutionEnvironment</span> <span class="variable">env</span> <span class="operator">=</span> StreamExecutionEnvironment.getExecutionEnvironment();</span><br><span class="line"></span><br><span class="line">        <span class="comment">/******** 配置流过程 *********/</span></span><br><span class="line">        addSource();         <span class="comment">// 流创建</span></span><br><span class="line"></span><br><span class="line">                             <span class="comment">// 流转化</span></span><br><span class="line">        <span class="comment">/******** 配置流过程 *********/</span></span><br><span class="line"></span><br><span class="line">        <span class="comment">// 执行</span></span><br><span class="line">        env.execute(<span class="string">&quot;Flink Streaming Java API Skeleton&quot;</span>);</span><br><span class="line"></span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>

<p><strong>DataStreamSource 对象</strong>是 DataStream 类的子类，代表着数据流的起始点。必须通过 addSource 方法生成, fromCollection/readTextFile 方法底层也会调用 addSource 方法。</p>
<p><strong>DataStream 对象</strong>代表着相同类型元素的流，可以通过转换（transformation）来实现转换为另一个 DataStream 对象。DataStrem 对象内部持有当前的 StreamExecutionEnvironment 对象和 DataTransformation 对象。</p>
<p><strong>StreamExecutionEnvironment 对象</strong>代表着当前流计算执行环境以及相关配置。每个 DataStream 类在做转换的时候，会首先创建转换对应的 DataTransformation 对象，最终形成一个 DataTransformation 链表被 StreamExecutionEnvironment 对象维护。</p>
<blockquote>
<p>Flink 在执行时，会把流拓扑（Source、Transformation、Sink）都转换为 DataFlow：由 Stream 和 Operator 组成，让 Stream在 Operator 中流动。</p>
</blockquote>
<h3 id="一致性"><a href="#一致性" class="headerlink" title="一致性"></a>一致性</h3><p>当在分布式系统中引入状态时，自然也引入了一致性问题。</p>
<p>在流处理中，一致性分为 3 个级别。</p>
<ul>
<li><p><strong>at-most-once</strong>：故障发生之后，计数结果可能丢失。</p>
</li>
<li><p><strong>at-least-once</strong>：这表示计数结果可能大于正确值，但绝不会小于正确值。也就是说，计数程序在发生故障后可能多算，但是绝不会少算。</p>
</li>
<li><p><strong>exactly-once</strong>：这指的是系统保证在发生故障后得到的计数结果与正确值一致。</p>
</li>
</ul>
<p>第一代流处理器（如 Storm 和 Samza）刚问世时只保证 at-least-once。最先保证 exactly-once 的系统（Storm Trident 和 Spark Streaming）在性能和表现力这两个方面付出了很大的代价。</p>
<hr>
<h2 id="Flink-操作-kafka"><a href="#Flink-操作-kafka" class="headerlink" title="Flink 操作 kafka"></a>Flink 操作 kafka</h2><p><a target="_blank" rel="noopener" href="https://zhuanlan.zhihu.com/p/92289771">https://zhuanlan.zhihu.com/p/92289771</a></p>
<p>flink 提供了一个特有的 kafka connector 去读写 kafka topic 的数据。这样在 flink 消费 kafka 数据时，就可以通过 flink 内部去跟踪 offset 和设定 checkpoint 去实现 exactly-once 的语义。</p>
<p>在 Flink 中，我们作为 Consumer 时需要用 Source Connectors 代表连接数据源的连接器，作为 Producer 时需要用 Sink Connector 代表连接数据输出的连接器。</p>
<h3 id="Source-Connector"><a href="#Source-Connector" class="headerlink" title="Source Connector"></a>Source Connector</h3><p>Flink Kafka connector 以并行的方式读入事件流，每个并行的 source task 都可以从一个或多个 partition 读入数据。Task 对于每个它当前正在读的 partition 都会追踪当前的 offset ，并将这些 offset 数据存储到它的 checkpoint 中。当发生故障进行恢复时，offset 被取出并重置，使得数据可以在上次检查点时的 offset 继续读数据。<em>Flink Kafka connector 并不依赖于 Kafka 本身的 offset-tracking 机制（也就是consumer groups机制）。</em></p>
<p><img src="/pic/sources.png" alt="source"></p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br></pre></td><td class="code"><pre><span class="line"><span class="type">StreamExecutionEnvironment</span> <span class="variable">env</span> <span class="operator">=</span> StreamExecutionEnvironment.getExecutionEnvironment()</span><br><span class="line"></span><br><span class="line"><span class="comment">// 设定配置</span></span><br><span class="line"><span class="type">val</span> <span class="variable">properties</span> <span class="operator">=</span> <span class="keyword">new</span> <span class="title class_">Properties</span>();</span><br><span class="line">properties.setProperty(<span class="string">&quot;bootstrap.servers&quot;</span>, <span class="string">&quot;localhost:9092&quot;</span>);</span><br><span class="line">properties.setProperty(<span class="string">&quot;group.id&quot;</span>, <span class="string">&quot;test&quot;</span>);</span><br><span class="line"></span><br><span class="line"><span class="comment">// 设置消费者并添加源</span></span><br><span class="line">FlinkKafkaConsumer&lt;String&gt; myConsumer = <span class="keyword">new</span> <span class="title class_">FlinkKafkaConsumer</span>&lt;String&gt;(</span><br><span class="line">    <span class="string">&quot;topic&quot;</span>,                                 </span><br><span class="line">    <span class="keyword">new</span> <span class="title class_">SimpleStringSchema</span>(),                </span><br><span class="line">    properties))   </span><br><span class="line">);</span><br><span class="line">DataStream[String] stream = env.addSource(myConsumer);                    </span><br></pre></td></tr></table></figure>

<p><strong>初始化 FlinkKafkaConsumer 参数</strong></p>
<ol>
<li><strong>topic 名字</strong>，用来指定消费一个或者多个topic的数据，也可以是正则表达式。</li>
<li>**反序列化器(schema)**，对消费数据进行反序列化，转换成自定义的数据结构。</li>
<li><strong>kafka 配置信息</strong>：如 zk 地址端口，kafka 地址端口等。此对象至少要包含两个条目 <code>bootstrap.servers</code> 与 <code>group.id</code>。</li>
</ol>
<p>反序列化器主要通过实现 KeyedDeserializationSchema 或者 DeserializationSchema 接口来完成，flink 内置，也可以自定义。</p>
<ul>
<li>转化为 String 类型 <code>SimpleStringSchema</code></li>
<li>转化为其它类型 <code>TypeInformationSerializationSchema&lt;T&gt;</code></li>
<li>转化为键值对类型 <code>TypeInformationKeyValueSerializationSchema&lt;K, V&gt;</code></li>
<li>转化为 JSON 类型 <code>JSONKeyValueDeserializationSchema</code></li>
</ul>
<p><strong>消费起始位置</strong></p>
<p>Flink Kafka Consumer 可以配置指定的 Kafka Partition 的起始位置。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br></pre></td><td class="code"><pre><span class="line">myConsumer.setStartFromEarliest()             <span class="comment">// start from the earliest record possible（默认）</span></span><br><span class="line">myConsumer.setStartFromLatest()               <span class="comment">// start from the latest record</span></span><br><span class="line">myConsumer.setStartFromTimestamp(...)         <span class="comment">// start from specified epoch timestamp (milliseconds)</span></span><br><span class="line">myConsumer.setStartFromGroupOffsets()         <span class="comment">// the default behaviour</span></span><br></pre></td></tr></table></figure>



<h3 id="Sink-Connector"><a href="#Sink-Connector" class="headerlink" title="Sink Connector"></a>Sink Connector</h3><p>Flink 提供为 Kafka 0.8 版本后所有 Kafka 版本的 sink connectors。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br></pre></td><td class="code"><pre><span class="line"><span class="comment">// 设定数据流</span></span><br><span class="line">DataStream[String] stream = environment.fromElements(<span class="string">&quot;1&quot;</span>, <span class="string">&quot;2&quot;</span>, <span class="string">&quot;3&quot;</span>, <span class="string">&quot;4&quot;</span>, <span class="string">&quot;5&quot;</span>, <span class="string">&quot;6&quot;</span>);</span><br><span class="line"> </span><br><span class="line"><span class="comment">// 设置生产者并添加到 sink</span></span><br><span class="line">FlinkKafkaProducer&lt;String&gt; myProducer = <span class="keyword">new</span> <span class="title class_">FlinkKafkaProducer</span>&lt;String&gt;(</span><br><span class="line">  <span class="string">&quot;localhost:9092&quot;</span>,                            </span><br><span class="line">  <span class="string">&quot;topic&quot;</span>,                                     </span><br><span class="line">  <span class="keyword">new</span> <span class="title class_">SimpleStringSchema</span>)                    </span><br><span class="line"> </span><br><span class="line">stream.addSink(myProducer)</span><br></pre></td></tr></table></figure>

<p><strong>初始化 FlinkKafkaProducer 参数</strong></p>
<ol>
<li><strong>broker 列表</strong>，要发往的 brokers , 用逗号分割。</li>
<li><strong>topic 名字</strong>，用来指定生产一个或者多个 topic 的数据，也可以是正则表达式。</li>
<li>**序列化器(schema)**，对消费数据进行序列化，将目标类型转换成字节数组。</li>
</ol>
<p>序列化器类比于反序列化器实现：</p>
<ul>
<li>转化为 String 类型 <code>SimpleStringSchema</code></li>
<li>转化为其它类型 <code>TypeInformationSerializationSchema&lt;T&gt;</code></li>
<li>转化为键值对类型 <code>TypeInformationKeyValueSerializationSchema&lt;K, V&gt;</code></li>
<li>转化为 JSON 类型 <code>JSONKeyValueDeserializationSchema</code></li>
</ul>
<h3 id="Kakfa-容错机制"><a href="#Kakfa-容错机制" class="headerlink" title="Kakfa 容错机制"></a>Kakfa 容错机制</h3><p>在 Kafka 0.9 之前不提供任何机制去保证 at-least-once 或 exactly-once 的语义。 但后续版本的 Kafka 可以通过以下方式来实现出错后恢复且不丢失数据：</p>
<ol>
<li><strong>启用 Checkpoint</strong></li>
</ol>
<p>在默认启用 Checkpoint 的状况下， FlinkKafkaConsumer 将消费来自 Topic 的记录，并以一致的方式周期性地 Checkpoint 其所有 Kafka Offset 以及其它操作的状态。万一作业失败，Flink 将把流失程序恢复到最新 Checkpoint的状态，并且重新消费 Kafka 中的数据。这确保了在 Kafka Broker 中所提交的 Offset 和 Checkpointed State 中的 Offset 是一致的。此时 FlinkKafkaProducer 可以提供 exactly-once 的投递语义。</p>
<p>如果 Checkpointing 没有启用，KafkaFlinkConsumer 将会周期性的提交 Offset 到 Zookeeper 中去。</p>
<ul>
<li><strong>配置 Semantic 参数</strong></li>
</ul>
<p>除了启用 Flink 的 Checkpointing，还可以通过传递恰当的 semantic 参数给 FlinkKafkaProducer 选择 3 种不同的操作模式:</p>
<ul>
<li><code>emantic.None</code> : Flink 什么也不会保证，所产生的记录可能会被丢失或者重复。</li>
<li><code>Semantic.AT_LEASET_ONCE</code>（默认）: Flink 保证 at-least-once ，没有记录会被丢失，但可能会重复。</li>
<li><code>Semantic.EXACTLY_ONCE</code> : 使用 Kafka 的事务机制来保证 exactly-once。</li>
</ul>
<p>Semantic.EXACTLY_ONCE 模式依赖于提交事务的能力，这些事务是在 taking a checkpoint 之前，从该 Checkpoint 恢复之后启动的。如果 Flink 应用崩溃且完成重启的时间比 Kafka 事务超时的时间大，则数据将会丢失（Kafka 将自动的终止超过超时时间的事务）。请务必根据预期的故障时间来配置你的事务超时。</p>
<h3 id="kafka-分区发现"><a href="#kafka-分区发现" class="headerlink" title="kafka 分区发现"></a>kafka 分区发现</h3><p>FlinkKafkaConsumer 支持发现动态创建的 Kafka Partition，并且以 exactly-once 语义保证来消费其中的数据。默认情况下分区发现是禁用的，要启用该特性在提供的属性配置中为参数 flink.partition-discovery.interval-millis 设置一个非负数的值，表示发现间隔（以毫秒为单位）。</p>
 
      <!-- reward -->
      
      <div id="reword-out">
        <div id="reward-btn">
          打赏
        </div>
      </div>
      
    </div>
    

    <!-- copyright -->
    
    <div class="declare">
      <ul class="post-copyright">
        <li>
          <i class="ri-copyright-line"></i>
          <strong>版权声明： </strong>
          
          本博客所有文章除特别声明外，著作权归作者所有。转载请注明出处！
          
        </li>
      </ul>
    </div>
    
    <footer class="article-footer">
       
<div class="share-btn">
      <span class="share-sns share-outer">
        <i class="ri-share-forward-line"></i>
        分享
      </span>
      <div class="share-wrap">
        <i class="arrow"></i>
        <div class="share-icons">
          
          <a class="weibo share-sns" href="javascript:;" data-type="weibo">
            <i class="ri-weibo-fill"></i>
          </a>
          <a class="weixin share-sns wxFab" href="javascript:;" data-type="weixin">
            <i class="ri-wechat-fill"></i>
          </a>
          <a class="qq share-sns" href="javascript:;" data-type="qq">
            <i class="ri-qq-fill"></i>
          </a>
          <a class="douban share-sns" href="javascript:;" data-type="douban">
            <i class="ri-douban-line"></i>
          </a>
          <!-- <a class="qzone share-sns" href="javascript:;" data-type="qzone">
            <i class="icon icon-qzone"></i>
          </a> -->
          
          <a class="facebook share-sns" href="javascript:;" data-type="facebook">
            <i class="ri-facebook-circle-fill"></i>
          </a>
          <a class="twitter share-sns" href="javascript:;" data-type="twitter">
            <i class="ri-twitter-fill"></i>
          </a>
          <a class="google share-sns" href="javascript:;" data-type="google">
            <i class="ri-google-fill"></i>
          </a>
        </div>
      </div>
</div>

<div class="wx-share-modal">
    <a class="modal-close" href="javascript:;"><i class="ri-close-circle-line"></i></a>
    <p>扫一扫，分享到微信</p>
    <div class="wx-qrcode">
      <img src="//api.qrserver.com/v1/create-qr-code/?size=150x150&data=https://xlw686.github.io/2023/05/14/%E5%85%B6%E5%AE%83/%E5%88%86%E5%B8%83%E5%BC%8F/spark/" alt="微信分享二维码">
    </div>
</div>

<div id="share-mask"></div>  
  <ul class="article-tag-list" itemprop="keywords"><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/yan/" rel="tag">yan</a></li><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/%E5%85%B6%E5%AE%83/" rel="tag">其它</a></li></ul>

    </footer>
  </div>

   
  <nav class="article-nav">
    
      <a href="/2023/05/14/%E5%85%B6%E5%AE%83/%E5%88%86%E5%B8%83%E5%BC%8F/rpc/" class="article-nav-link">
        <strong class="article-nav-caption">上一篇</strong>
        <div class="article-nav-title">
          
            RPC
          
        </div>
      </a>
    
    
      <a href="/2023/05/14/%E5%85%B6%E5%AE%83/%E5%88%86%E5%B8%83%E5%BC%8F/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/" class="article-nav-link">
        <strong class="article-nav-caption">下一篇</strong>
        <div class="article-nav-title">消息队列</div>
      </a>
    
  </nav>

  
   
  
   
    <script src="https://cdn.staticfile.org/twikoo/1.4.18/twikoo.all.min.js"></script>
    <div id="twikoo" class="twikoo"></div>
    <script>
        twikoo.init({
            envId: ""
        })
    </script>
 
</article>

</section>
      <footer class="footer">
  <div class="outer">
    <ul>
      <li>
        Copyrights &copy;
        2023
        <i class="ri-heart-fill heart_icon"></i> 空唤晴
      </li>
    </ul>
    <ul>
      <li>
        
      </li>
    </ul>
    <ul>
      <li>
        
        
        <span>
  <span><i class="ri-user-3-fill"></i>访问人数:<span id="busuanzi_value_site_uv"></span></span>
  <span class="division">|</span>
  <span><i class="ri-eye-fill"></i>浏览次数:<span id="busuanzi_value_page_pv"></span></span>
</span>
        
      </li>
    </ul>
    <ul>
      
    </ul>
    <ul>
      
    </ul>
    <ul>
      <li>
        <!-- cnzz统计 -->
        
      </li>
    </ul>
  </div>
</footer>    
    </main>
    <div class="float_btns">
      <div class="totop" id="totop">
  <i class="ri-arrow-up-line"></i>
</div>

<div class="todark" id="todark">
  <i class="ri-moon-line"></i>
</div>

    </div>
    <aside class="sidebar on">
      <button class="navbar-toggle"></button>
<nav class="navbar">
  
  <div class="logo">
    <a href="/"><img src="/images/ayer-side.svg" alt="空唤晴"></a>
  </div>
  
  <ul class="nav nav-main">
    
    <li class="nav-item">
      <a class="nav-item-link" href="/">主页</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/archives">归档</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/categories">分类</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/tags">标签</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/friends">友链</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/2023/01/01/about">关于我</a>
    </li>
    
  </ul>
</nav>
<nav class="navbar navbar-bottom">
  <ul class="nav">
    <li class="nav-item">
      
      <a class="nav-item-link nav-item-search"  title="搜索">
        <i class="ri-search-line"></i>
      </a>
      
      
      <a class="nav-item-link" target="_blank" href="/atom.xml" title="RSS Feed">
        <i class="ri-rss-line"></i>
      </a>
      
    </li>
  </ul>
</nav>
<div class="search-form-wrap">
  <div class="local-search local-search-plugin">
  <input type="search" id="local-search-input" class="local-search-input" placeholder="Search...">
  <div id="local-search-result" class="local-search-result"></div>
</div>
</div>
    </aside>
    <div id="mask"></div>

<!-- #reward -->
<div id="reward">
  <span class="close"><i class="ri-close-line"></i></span>
  <p class="reward-p"><i class="ri-cup-line"></i>请我喝杯咖啡吧~</p>
  <div class="reward-box">
    
    <div class="reward-item">
      <img class="reward-img" src="/images/alipay.jpg">
      <span class="reward-type">支付宝</span>
    </div>
    
    
    <div class="reward-item">
      <img class="reward-img" src="/images/wechat.jpg">
      <span class="reward-type">微信</span>
    </div>
    
  </div>
</div>
    
<script src="/js/jquery-3.6.0.min.js"></script>
 
<script src="/js/lazyload.min.js"></script>

<!-- Tocbot -->
 
<script src="/js/tocbot.min.js"></script>

<script>
  tocbot.init({
    tocSelector: ".tocbot",
    contentSelector: ".article-entry",
    headingSelector: "h1, h2, h3, h4, h5, h6",
    hasInnerContainers: true,
    scrollSmooth: true,
    scrollContainer: "main",
    positionFixedSelector: ".tocbot",
    positionFixedClass: "is-position-fixed",
    fixedSidebarOffset: "auto",
  });
</script>

<script src="https://cdn.staticfile.org/jquery-modal/0.9.2/jquery.modal.min.js"></script>
<link
  rel="stylesheet"
  href="https://cdn.staticfile.org/jquery-modal/0.9.2/jquery.modal.min.css"
/>
<script src="https://cdn.staticfile.org/justifiedGallery/3.8.1/js/jquery.justifiedGallery.min.js"></script>

<script src="/dist/main.js"></script>

<!-- ImageViewer -->
 <!-- Root element of PhotoSwipe. Must have class pswp. -->
<div class="pswp" tabindex="-1" role="dialog" aria-hidden="true">

    <!-- Background of PhotoSwipe. 
         It's a separate element as animating opacity is faster than rgba(). -->
    <div class="pswp__bg"></div>

    <!-- Slides wrapper with overflow:hidden. -->
    <div class="pswp__scroll-wrap">

        <!-- Container that holds slides. 
            PhotoSwipe keeps only 3 of them in the DOM to save memory.
            Don't modify these 3 pswp__item elements, data is added later on. -->
        <div class="pswp__container">
            <div class="pswp__item"></div>
            <div class="pswp__item"></div>
            <div class="pswp__item"></div>
        </div>

        <!-- Default (PhotoSwipeUI_Default) interface on top of sliding area. Can be changed. -->
        <div class="pswp__ui pswp__ui--hidden">

            <div class="pswp__top-bar">

                <!--  Controls are self-explanatory. Order can be changed. -->

                <div class="pswp__counter"></div>

                <button class="pswp__button pswp__button--close" title="Close (Esc)"></button>

                <button class="pswp__button pswp__button--share" style="display:none" title="Share"></button>

                <button class="pswp__button pswp__button--fs" title="Toggle fullscreen"></button>

                <button class="pswp__button pswp__button--zoom" title="Zoom in/out"></button>

                <!-- Preloader demo http://codepen.io/dimsemenov/pen/yyBWoR -->
                <!-- element will get class pswp__preloader--active when preloader is running -->
                <div class="pswp__preloader">
                    <div class="pswp__preloader__icn">
                        <div class="pswp__preloader__cut">
                            <div class="pswp__preloader__donut"></div>
                        </div>
                    </div>
                </div>
            </div>

            <div class="pswp__share-modal pswp__share-modal--hidden pswp__single-tap">
                <div class="pswp__share-tooltip"></div>
            </div>

            <button class="pswp__button pswp__button--arrow--left" title="Previous (arrow left)">
            </button>

            <button class="pswp__button pswp__button--arrow--right" title="Next (arrow right)">
            </button>

            <div class="pswp__caption">
                <div class="pswp__caption__center"></div>
            </div>

        </div>

    </div>

</div>

<link rel="stylesheet" href="https://cdn.staticfile.org/photoswipe/4.1.3/photoswipe.min.css">
<link rel="stylesheet" href="https://cdn.staticfile.org/photoswipe/4.1.3/default-skin/default-skin.min.css">
<script src="https://cdn.staticfile.org/photoswipe/4.1.3/photoswipe.min.js"></script>
<script src="https://cdn.staticfile.org/photoswipe/4.1.3/photoswipe-ui-default.min.js"></script>

<script>
    function viewer_init() {
        let pswpElement = document.querySelectorAll('.pswp')[0];
        let $imgArr = document.querySelectorAll(('.article-entry img:not(.reward-img)'))

        $imgArr.forEach(($em, i) => {
            $em.onclick = () => {
                // slider展开状态
                // todo: 这样不好，后面改成状态
                if (document.querySelector('.left-col.show')) return
                let items = []
                $imgArr.forEach(($em2, i2) => {
                    let img = $em2.getAttribute('data-idx', i2)
                    let src = $em2.getAttribute('data-target') || $em2.getAttribute('src')
                    let title = $em2.getAttribute('alt')
                    // 获得原图尺寸
                    const image = new Image()
                    image.src = src
                    items.push({
                        src: src,
                        w: image.width || $em2.width,
                        h: image.height || $em2.height,
                        title: title
                    })
                })
                var gallery = new PhotoSwipe(pswpElement, PhotoSwipeUI_Default, items, {
                    index: parseInt(i)
                });
                gallery.init()
            }
        })
    }
    viewer_init()
</script> 
<!-- MathJax -->

<!-- Katex -->

<!-- busuanzi  -->
 
<script src="/js/busuanzi-2.3.pure.min.js"></script>
 
<!-- ClickLove -->

<!-- ClickBoom1 -->

<script src="https://cdn.staticfile.org/animejs/3.2.1/anime.min.js"></script>

<script src="/js/clickBoom1.js"></script>
 
<!-- ClickBoom2 -->

<!-- CodeCopy -->
 
<link rel="stylesheet" href="/css/clipboard.css">
 <script src="https://cdn.staticfile.org/clipboard.js/2.0.10/clipboard.min.js"></script>
<script>
  function wait(callback, seconds) {
    var timelag = null;
    timelag = window.setTimeout(callback, seconds);
  }
  !function (e, t, a) {
    var initCopyCode = function(){
      var copyHtml = '';
      copyHtml += '<button class="btn-copy" data-clipboard-snippet="">';
      copyHtml += '<i class="ri-file-copy-2-line"></i><span>COPY</span>';
      copyHtml += '</button>';
      $(".highlight .code pre").before(copyHtml);
      $(".article pre code").before(copyHtml);
      var clipboard = new ClipboardJS('.btn-copy', {
        target: function(trigger) {
          return trigger.nextElementSibling;
        }
      });
      clipboard.on('success', function(e) {
        let $btn = $(e.trigger);
        $btn.addClass('copied');
        let $icon = $($btn.find('i'));
        $icon.removeClass('ri-file-copy-2-line');
        $icon.addClass('ri-checkbox-circle-line');
        let $span = $($btn.find('span'));
        $span[0].innerText = 'COPIED';
        
        wait(function () { // 等待两秒钟后恢复
          $icon.removeClass('ri-checkbox-circle-line');
          $icon.addClass('ri-file-copy-2-line');
          $span[0].innerText = 'COPY';
        }, 2000);
      });
      clipboard.on('error', function(e) {
        e.clearSelection();
        let $btn = $(e.trigger);
        $btn.addClass('copy-failed');
        let $icon = $($btn.find('i'));
        $icon.removeClass('ri-file-copy-2-line');
        $icon.addClass('ri-time-line');
        let $span = $($btn.find('span'));
        $span[0].innerText = 'COPY FAILED';
        
        wait(function () { // 等待两秒钟后恢复
          $icon.removeClass('ri-time-line');
          $icon.addClass('ri-file-copy-2-line');
          $span[0].innerText = 'COPY';
        }, 2000);
      });
    }
    initCopyCode();
  }(window, document);
</script>
 
<!-- CanvasBackground -->
 
<script src="/js/dz.js"></script>
 
<script>
  if (window.mermaid) {
    mermaid.initialize({ theme: "forest" });
  }
</script>


    
    

  </div>
</body>

</html>